-
Notifications
You must be signed in to change notification settings - Fork 11.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #292] Add support of transactional message feature #358
Conversation
Related Issue: |
This is a big pull request, containing bulks of changes...Please wait patiently and I would join you as soon as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first review
} | ||
|
||
class TransactionCheckListenerBImpl implements TransactionCheckListener { | ||
//class TransactionExecuterBImpl implements LocalTransactionExecuter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra commented code has been removed in the latest commit,thanks for your reminding.
|
||
public TransactionCheckListenerBImpl(boolean ischeckffalse, | ||
StatsBenchmarkTProducer statsBenchmarkTProducer) { | ||
public TransactionListenerImpl(boolean ischeckffalse, boolean isCheckLocal, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ischeckffalse ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, the last commit did not modify the original code, including variable naming, but the latest commit has been changed.
+ tranStateTableOffset + ", commitLogOffset=" + commitLogOffset + ", commitOrRollback=" | ||
+ commitOrRollback + ", fromTransactionCheck=" + fromTransactionCheck + ", msgId=" + msgId | ||
+ "]"; | ||
return "EndTransactionRequestHeader{" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ToStringBuilder is easier way to reflect all field in one class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used IDEA's automatic generation tool to override the toString method.
@@ -17,7 +17,7 @@ | |||
package org.apache.rocketmq.client.producer; | |||
|
|||
import org.apache.rocketmq.common.message.Message; | |||
|
|||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
@@ -18,7 +18,7 @@ | |||
package org.apache.rocketmq.broker.transaction; | |||
|
|||
import java.util.List; | |||
|
|||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface has been deleted in the latest commit.
final Channel channel, | ||
final CheckTransactionStateRequestHeader requestHeader, | ||
final SelectMappedBufferResult selectMappedBufferResult) { | ||
final MessageExt messageExt) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this Exception be specific?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review,in fact, I don't want to throw any exceptions here,but considering that the encode method may cause exception,so in this method just throws exception following with the encode method.
Channel channel = channelList.get(index); | ||
int count = 0; | ||
boolean isOk = channel.isActive() && channel.isWritable(); | ||
while (isOk && count++ < 3) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 is a magic number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will use a constant to represent this value, which represents the number of retries for fetch an available channel.
} catch (Throwable e) { | ||
log.error("invokeProducer exception", e); | ||
selectMappedBufferResult.release(); | ||
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an empirical value when using the invokeOneWay method. In the check scenario , we don't need to wait for the producer‘s response, and in order to prevent too many threads waiting,we adopted this configuration,but we also considered that whether it is necessary to make this parameter configurable,what’s your opinion?
package org.apache.rocketmq.broker.transaction; | ||
|
||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deprecated only but missing the deadline and replacement is not a good practice
|
||
private static final int TRY_PULL_MSG_NUMBER = 1; | ||
|
||
private final int queueTime = 60000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to avoid a queue check taking up too much time and leading to other queues to starve, we added this configuration,but I will change the variable name to uppercase.
LGTM if a rational response to my point out. This is an awesome feature, @lizhanhui @Jaskey @shroman Could we speed up the feedback for my kindly proposer, I would like to integrate into our develop branch :-) BTW, @duhengforever , do we have a wiki for the architecture and user guide? |
@duhengforever Forget to comment, What is the coverage of the new code, over 80%? |
For transactional messages, I added some documentation, including examples and design documentation., I hope these documents can help you to review this PR, thanks! |
great job!!it seems the good things are coming soon! |
@@ -142,6 +150,9 @@ | |||
private BrokerFastFailure brokerFastFailure; | |||
private Configuration configuration; | |||
private FileWatchService fileWatchService; | |||
private TransactionalMessageCheckService transactionMsgCheckService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use consistent naming. If abbreviation were used, so were these relevant.
public void start() { | ||
if (started.compareAndSet(false, true)) { | ||
super.start(); | ||
this.brokerController.getTransactionalMessageService().open(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why open?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering that in other storage media implementations, maybe need to open some related settings, so provided this method.
} | ||
|
||
@Override | ||
public void run() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should take advantage of what ServiceThread have already provided. No
while(true) {
...
sleep(xxx);
...
}
Use onWaitEnd() instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean waitForRunning() and onWaitEnd() methods? yeah, I used these two methods to transform this implementation in the latest commit.
public class TransactionalMessageServiceImpl implements TransactionalMessageService { | ||
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); | ||
|
||
private TransactionalMessageBridge transactionBridge; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having TransactionalMessageService --> TransactionalMessageServiceImpl abstraction, is it necesary to add another proxy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, in the development process, I have considered this problem, I personally think it is not needed, and we provide the Serviceloader to load other implementations. and if add another proxy,it will make the call hierarchy deeper and deeper, and need to add a strategy to ensure loading to the correct implementation,not convenient for later code maintenance.
return transactionBridge.putHalfMessage(messageInner); | ||
} | ||
|
||
private boolean isNeedDiscard(MessageExt msgExt, int transactionCheckMax) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isNeedXXX
naming is weird and does not follow grammar. Using needXXX suffices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion
MessageExtBrokerInner msgInner = transactionBridge.renewHalfMessageInner(messageExt); | ||
putMessageResult = transactionBridge.putMessageReturnResult(msgInner); | ||
} catch (Exception e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just printStackTrace?
We need to recover from the error and mark tx service unavailable and propagate the unavailable state to producer clients if recovering is not possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for helping to point out the problem,output to the log will be a good choice, as in this place, The outer method uses this method to decide whether to send back the request ,based on the status of putMessageResult, so there's no need to propagate the unavailable state to producer.
|
@lizhanhui thanks for your excellent review firstly.
|
Glad to see the great work in the community, I would like to merge the pr if all comments are resolved |
congrats! |
@duhengforever Why do we not implement transactions checking based on database, and how to confirm previous transaction information if the service is down? |
index = (++index) % size; | ||
channel = channelList.get(index); | ||
return channel; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here if isOk= true will return next channel.
i think it should be like
while ( count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) { if(isOk){ return channel; } index = (++index) % size; channel = channelList.get(index); isOk = channel.isActive() && channel.isWritable(); }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your careful review,and I will fix this problem ASAP.
@klaus-pd In order to avoid external dependencies, such as database, we used the mechanism of rocketMQ itself to implement the check logic. and if the producer down, broker will send check request to other producers with same produceId. |
@duhengforever I know, but if only one producer of the same produceId, how to solve the problem of checking back? |
@klaus-pd broker will resend the check request when the producer restart util reached a limit. |
for (MessageExt opMessageExt : opMsg) { | ||
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset)); | ||
log.info("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(), | ||
opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
why this can get the value of HalfOffset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Details can be found in TransactionalMessageService#deletePrepareMessage method.
Please do not create a Pull Request without creating an issue first.
What is the purpose of the change
In order to implement eventual consistency in distributed system, we define a new type of message named transactional message, it can be thought of as a two-phase commit message implementation to ensure eventual consistency in distributed system. Transactional message ensures that the execution of local transaction and the sending of message can be performed atomically.
Brief changelog
In our implementation, we made full use of the mechanism of rocketMQ itself to avoid external dependencies, and we defined a pair of message: named half message and commit/rollback message. Half message means prepared message, which are not visible to consumers,and it will be sent before local transaction is executed. After the local transaction ends,producer will send a commit or rollback message to broker based on local transaction execution result
Verifying this change
This is a trivial change.
Follow this checklist to help us incorporate your contribution quickly and easily:
[ISSUE #123] Fix UnknownException when host config not exist
. Each commit in the pull request should have a meaningful subject line and body.mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle
to make sure basic checks pass. Runmvn clean install -DskipITs
to make sure unit-test pass. Runmvn clean test-compile failsafe:integration-test
to make sure integration-test pass.